package springboot.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import javax.annotation.Resource;
import java.util.Map;

@Configuration
public class ConsumerKafkaConfig {

    @Resource
    private ObjectMapper objectMapper;

    //反序列化器
    @Bean
    public DefaultKafkaConsumerFactory<?, ?> cf(KafkaProperties properties) {
        Map<String, Object> props = properties.buildConsumerProperties();
        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),  //指定key的反序列化方式是String
                new JsonDeserializer<>(objectMapper));  //指定value的反序列化方式是JSON
    }

    //序列化器
    @Bean
    public DefaultKafkaProducerFactory<?, ?> pf(KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(props,
                new StringSerializer(),   //指定key的序列化方式是String
                new JsonSerializer<>(objectMapper)); //指定value的序列化方式是JSON
    }

}